[fix](rpc) Fix AutoReleaseClosure data race with callback reuse#61782
[fix](rpc) Fix AutoReleaseClosure data race with callback reuse#61782zclllyybb wants to merge 2 commits intoapache:masterfrom
Conversation
|
/review |
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
There was a problem hiding this comment.
Pull request overview
Fixes a brpc AutoReleaseClosure race when callbacks are reused to start a new RPC (mutating response_/cntl_), by ensuring status/failure checks happen before invoking callback->call(), and adds unit tests to prevent regressions.
Changes:
- Reorders
AutoReleaseClosure::Run()so it checkscntl_/response_->status()before invokingcallback->call(). - Refactors runtime-filter sync-size RPC handling to keep the callback alive until RPC completion.
- Adds BE unit tests that simulate callback reuse mutating shared RPC state.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| be/test/exec/exchange/exchange_sink_test.cpp | Adds tests that simulate callback reuse and verify correct ordering. |
| be/src/util/brpc_closure.h | Reorders Run() and simplifies error handling paths. |
| be/src/exec/runtime_filter/runtime_filter_producer.h | Adds storage to keep sync-size callback alive across async RPC. |
| be/src/exec/runtime_filter/runtime_filter_producer.cpp | Introduces SyncSizeCallback and changes closure construction/lifetime. |
| be/src/exec/runtime_filter/runtime_filter_mgr.h | Updates _send_rf_to_target signature (removes QueryContext arg). |
| be/src/exec/runtime_filter/runtime_filter_mgr.cpp | Updates runtime-filter RPC closure construction (drops ctx passing). |
| be/src/exec/runtime_filter/runtime_filter.cpp | Updates runtime-filter RPC closure construction (drops ctx passing). |
| be/src/exec/operator/exchange_sink_buffer.cpp | Adds comments documenting callback reuse ordering constraints. |
| be/src/exec/exchange/vdata_stream_sender.h | Documents callback reuse rationale. |
Comments suppressed due to low confidence (1)
be/src/util/brpc_closure.h:1
- This change removes the previous
QueryContext-based failure propagation (anderror_msgaugmentation) fromAutoReleaseClosure, replacing it with logging only. That’s a behavior change: some call sites previously relied onAutoReleaseClosureto cancel the query on RPC failure / non-OKstatus(), gated byignore_runtime_filter_error; after this patch, errors may no longer interrupt the query and could lead to hangs or delayed failure handling. Recommendation (mandatory): restore a mechanism to propagate failures (either by reintroducing the optionalQueryContext+ captured status/failure info before callingtmp->call(), or by requiring callers to pass a callback that performs the cancel/sub/error handling) while keeping thecall()invocation as the last step to avoid the reuse race.
// Licensed to the Apache Software Foundation (ASF) under one
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Code Review Summary
PR Goal & Accomplishment
This PR fixes a real data race in AutoReleaseClosure::Run() where call() was invoked before checking cntl_->Failed() and response_->status(). Because call() can trigger callback reuse (e.g., in ExchangeSendCallback::call() -> _suc_fn -> _send_rpc() which reuses the same callback via get_send_callback()), the closure would then read mutated cntl_/response_ state from the new RPC instead of the original. The fix correctly reorders: log/check first, then call() last. The crash stacktrace and debug-log evidence clearly confirm the race.
The secondary change — removing QueryContext cancellation from AutoReleaseClosure and moving error handling into individual callbacks — is also sound. All callback types that need error handling (ExchangeSendCallback, SyncSizeCallback, WriteBlockCallback) already fully handle errors in their call() methods. The DummyBrpcCallback paths either have caller-side join()+check patterns or are fire-and-forget operations where query cancellation was too aggressive.
Compilation Bug Found
The old code had two _process_status overloads: an unconstrained no-op fallback template <typename Response> void _process_status(Response*) {} and a constrained template <HasStatus Response> void _process_status(Response*). The new code only has the constrained _log_error_status<HasStatus Response> but removed the unconstrained fallback. This will cause a compilation failure for PTabletWriterCancelResult (an empty protobuf message with no status() field), used in vtablet_writer.cpp:1229.
Critical Checkpoint Conclusions
-
Goal accomplished? Yes, the core data race fix is correct. Tests prove the ordering. However, there is a compilation bug (see inline comment).
-
Modification minimal and focused? Yes. The changes are well-scoped: core fix in
brpc_closure.h, consequent cleanup ofQueryContextremoval, andSyncSizeClosure->SyncSizeCallbackrefactor. -
Concurrency: The core fix addresses the concurrency issue correctly by ensuring
call()(which may trigger callback reuse and new RPCs) is the last operation inRun(). TheSyncSizeCallback::call()readscntl_andresponse_which are safe becausecall()is now called before any reuse can happen. -
Lifecycle management:
_sync_size_callback(asshared_ptr<void>) correctly keeps theSyncSizeCallbackalive until theRuntimeFilterProduceris destroyed. Without this,AutoReleaseClosure'sweak_ptrto the callback would expire whensend_size()returns, and the error-pathsub()would never fire. This is correct. -
Parallel code paths: The two success handler lambdas in
exchange_sink_buffer.cpp(unicast and broadcast) both have the same comment added about_send_rpcbeing last. Good. -
Test coverage: Three well-designed unit tests verify the ordering invariant by capturing state at
call()time and mutating afterward. Tests correctly fail with the old ordering and pass with the new. -
Error handling: The behavioral change from
QueryContext::cancel()toLOG(WARNING)is safe — all critical paths already handle errors in their callbacks or via caller-sidejoin()+check. -
No incompatible changes: No function symbol or storage format changes.
-
Configuration: No new configs added.
-
Observability: Error logging is preserved (downgraded from
canceltoLOG(WARNING)inAutoReleaseClosure, but callbacks handle errors appropriately).
|
/review |
|
run buildall |
There was a problem hiding this comment.
Code Review Summary
PR Goal
Fix a real data race in AutoReleaseClosure where call() was invoked before checking cntl_->Failed() and response_->status(). When call() reuses the callback (as in ExchangeSendCallback via get_send_callback()), it can trigger a new RPC that mutates response_ and cntl_, causing the subsequent reads to see the new RPC's state instead of the original — leading to SIGSEGV crashes.
Core Fix Assessment
The fix is correct and well-motivated:
- Reordering in
Run(): Log/check cntl/response state before callingcallback_->call(). This prevents the data race. - Removing
QueryContextfromAutoReleaseClosure: Decentralizes error handling to individual callbacks, which is architecturally cleaner. SyncSizeCallbackrefactoring: Moves from subclassingAutoReleaseClosureto subclassingDummyBrpcCallback, properly handling errors incall().- Unit tests: Three well-designed tests verify the ordering invariant.
Critical Checkpoint Conclusions
- Goal accomplished? Yes, the data race is fixed by reordering
call()to be last inRun(). Tests prove the ordering. - Modification minimal and focused? Mostly yes. The
QueryContextremoval is a secondary cleanup bundled with the fix. - Concurrency? The core race condition is correctly addressed. The callback reuse pattern in
ExchangeSendCallbackis safe because_send_rpc(which triggers reuse) is documented and enforced as the last operation in success handlers. - Lifecycle management?
_sync_size_callbackasshared_ptr<void>correctly keeps the callback alive. Theweak_ptrinAutoReleaseClosureallows graceful degradation if the callback owner is gone. - Parallel code paths? Both unicast and broadcast paths in
exchange_sink_buffer.cppare updated with matching comments. - Error handling? See inline comment — the
ignore_runtime_filter_error=false+ RPC failure path that previously cancelled the query now silently degrades. The existing regression testtest_inject_send_filter_size_fail.groovyexpects the old cancellation behavior and will likely fail. - Test coverage? Good unit tests for the ordering fix. Missing update to the regression test.
- Incompatible changes? No storage/protocol changes. The
AutoReleaseClosureconstructor signature change is source-breaking but all callers are updated. - Performance? No concerns — the reordering adds no overhead.
TPC-H: Total hot run time: 26928 ms |
TPC-DS: Total hot run time: 167957 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
/review |
There was a problem hiding this comment.
Code Review Summary
PR Goal
Fix a data race in AutoReleaseClosure::Run() where call() was invoked before checking cntl_->Failed() and response_->status(). Since call() can reuse the callback (resetting cntl_ and response_), subsequent reads of these fields in Run() would see the NEW RPC's state instead of the original, causing SIGSEGV crashes.
Core Fix Assessment
The fix is correct and well-motivated. Moving callback->call() to be the last operation in AutoReleaseClosure::Run() (after logging) eliminates the data race. The approach of removing QueryContext from AutoReleaseClosure and pushing error handling into individual callbacks is a clean architectural improvement that makes the responsibilities clearer.
Critical Checkpoint Conclusions
1. Does the code accomplish its goal?
Yes. The reordering in AutoReleaseClosure::Run() ensures cntl_ and response_ are read before call() can mutate them. The new tests prove the ordering.
2. Is the modification minimal and focused?
Mostly yes. The core fix (reordering in brpc_closure.h) is minimal. The SyncSizeCallback refactoring is a necessary consequence. However, the removal of QueryContext-based error propagation from the DummyBrpcCallback paths (merge_filter, sync_filter_size coordinator-side, publish_filter) is a behavioral side-effect that goes beyond the race fix -- see inline comment.
3. Concurrency?
Yes, this is fundamentally a concurrency fix. The critical variable is the shared cntl_/response_ state between AutoReleaseClosure and the callback. The fix ensures the closure reads before the callback writes, without requiring locks (sequential ordering within Run() suffices since the race was between the closure's own read and the callback's write-then-reuse pattern).
4. Lifecycle management?
The _sync_size_callback member in RuntimeFilterProducer correctly keeps the SyncSizeCallback alive beyond send_size() return, compensating for AutoReleaseClosure's weak_ptr storage. Good design.
5. Parallel code paths?
WriteBlockCallback (in vtablet_writer.h) follows the same pattern and is safe: its call() method handles errors internally and calls clear_in_flight() at the very end. The ExchangeSendCallback also handles errors internally in call(). Both are compatible with the new ordering.
6. Test coverage?
Good. Three new tests in exchange_sink_test.cpp verify the ordering invariant. Seven tests in sync_size_callback_test.cpp cover success, failure, ignore_runtime_filter_error, and expired weak_ptr scenarios for SyncSizeCallback.
7. Configuration changes? None.
8. Incompatible changes? None (internal refactoring only).
9. Performance? No concerns. The _log_error_status call before callback->call() is lightweight (just a LOG on error).
10. Behavioral change concern: See inline comment on runtime_filter.cpp. The removal of QueryContext from AutoReleaseClosure means that merge_filter, coordinator-side sync_filter_size, and publish_filter RPC failures no longer cancel the query when ignore_runtime_filter_error=false. This is a behavioral change beyond the race fix scope.
11. Stale comment: runtime_filter_wrapper.h:91 still references SyncSizeClosure (now renamed to SyncSizeCallback). Not in the diff, so noting here for follow-up.
Verdict
The core race fix is correct and well-tested. One behavioral concern is flagged inline regarding the loss of query cancellation on runtime filter RPC failures in non-SyncSizeCallback paths.
| ? std::weak_ptr<QueryContext> {} | ||
| : state->get_query_ctx_weak()); | ||
| create_unique(merge_filter_request, merge_filter_callback); | ||
| void* data = nullptr; |
There was a problem hiding this comment.
Behavioral change: Previously, this AutoReleaseClosure received a weak_ptr<QueryContext> (when ignore_runtime_filter_error=false), and _process_if_rpc_failed() / _process_if_meet_error_status() would cancel the query on merge_filter RPC failure.
Now with the QueryContext removed from AutoReleaseClosure, this path only logs a WARNING on failure -- the query is not cancelled. The same applies to the coordinator-side sync_filter_size and publish_filter RPCs in runtime_filter_mgr.cpp.
If this is intentional (runtime filter failures are non-fatal for correctness), it should be documented in the PR description as a deliberate behavioral change. If not, these paths need custom callbacks (like SyncSizeCallback) that respect ignore_runtime_filter_error and cancel the query when appropriate.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| state->query_options().ignore_runtime_filter_error | ||
| ? std::weak_ptr<QueryContext> {} | ||
| : state->get_query_ctx_weak()); | ||
| create_unique(merge_filter_request, merge_filter_callback); |
There was a problem hiding this comment.
This RPC invocation no longer passes a QueryContext into AutoReleaseClosure, so failures/error statuses will only be logged and won’t cancel the query even when ignore_runtime_filter_error is false. Given _push_to_remote() returns Status::OK() immediately and doesn’t join/inspect the callback, this can silently drop runtime-filter propagation failures. Consider reintroducing query-cancel behavior (checked before invoking callback->call() to avoid the reuse race) or handling errors explicitly in the callback.
| create_unique(merge_filter_request, merge_filter_callback); | |
| create_unique(merge_filter_request, merge_filter_callback, | |
| state->get_query_ctx()); |
| Status status = Status::create(response_->status()); | ||
| if (!status.ok()) { | ||
| if (auto w = _wrapper.lock()) { | ||
| w->set_state(RuntimeFilterWrapper::State::DISABLED, status.to_string()); | ||
| } | ||
| if (auto ctx = _context.lock()) { | ||
| if (!ctx->ignore_runtime_filter_error()) { | ||
| ctx->cancel(Status::NetworkError("RPC meet failed: {}", status.to_string())); | ||
| } | ||
| } | ||
| ((CountedFinishDependency*)_dependency.get())->sub(); | ||
| } |
There was a problem hiding this comment.
On non-OK response status, the code cancels the query using Status::NetworkError("RPC meet failed: {}", status.to_string()). This misclassifies an application-level error status as a network error and also reuses the “RPC meet failed” wording even though the RPC itself succeeded. Consider cancelling with the actual status (or mapping to a more appropriate error code/message) so error handling/metrics reflect the real failure mode.
| // Should use weak ptr here, because when query context deconstructs, should also delete runtime filter | ||
| // context, it not the memory is not released. And rpc is in another thread, it will hold rf context | ||
| // after query context because the rpc is not returned. |
There was a problem hiding this comment.
Comment grammar is unclear: “context, it not the memory is not released.” This is hard to read and may be misleading for future maintainers.
| // Should use weak ptr here, because when query context deconstructs, should also delete runtime filter | |
| // context, it not the memory is not released. And rpc is in another thread, it will hold rf context | |
| // after query context because the rpc is not returned. | |
| // Should use a weak ptr here because when the query context is destroyed, the runtime filter | |
| // context should also be deleted; otherwise its memory may not be released. Since the RPC runs | |
| // in another thread, it may hold the runtime filter context after the query context is destroyed | |
| // if the RPC has not yet returned. |
| auto closure = AutoReleaseClosure<PPublishFilterRequestV2, | ||
| DummyBrpcCallback<PPublishFilterResponse>>:: | ||
| create_unique(std::make_shared<PPublishFilterRequestV2>(apply_request), | ||
| DummyBrpcCallback<PPublishFilterResponse>::create_shared(), ctx); | ||
| DummyBrpcCallback<PPublishFilterResponse>::create_shared()); |
There was a problem hiding this comment.
Similar to sync_filter_size, apply_filterv2 RPCs now use AutoReleaseClosure without any query-context cancellation/propagation. If these RPCs fail, the query may continue without required runtime filters (or hang waiting for them depending on downstream logic) with only a warning log. Consider adding explicit error handling (cancel query when ignore_runtime_filter_error is false, or disable the affected filter and unblock dependencies) in a callback, while keeping call() last to avoid callback reuse races.
| void Run() override { | ||
| Defer defer {[&]() { delete this; }}; | ||
| // If lock failed, it means the callback object is deconstructed, then no need | ||
| // to deal with the callback any more. | ||
| if (auto tmp = callback_.lock()) { | ||
| tmp->call(); | ||
| } | ||
| // shouldn't do heavy work here. all heavy work should be done in callback's call() (which means in success/failure handlers) | ||
| if (cntl_->Failed()) { | ||
| _process_if_rpc_failed(); | ||
| LOG(WARNING) << "brpc failed: " << cntl_->ErrorText(); | ||
| } else { | ||
| _process_status<ResponseType>(response_.get()); | ||
| _log_error_status<ResponseType>(response_.get()); | ||
| } | ||
| // this must be the LAST operation in this function, because call() may reuse the callback! (response_ is in callback_) | ||
| if (auto tmp = callback_.lock()) { | ||
| tmp->call(); | ||
| } |
There was a problem hiding this comment.
AutoReleaseClosure::Run() now only logs cntl_->Failed() / response status errors, and then invokes callback->call() last (good for avoiding the reuse race). However, this also removes the previous ability to cancel a QueryContext on RPC failure/error status, which several runtime-filter call sites previously depended on. Consider restoring an optional QueryContext/error-handling hook executed before callback->call() so you keep the ordering fix without weakening error propagation semantics across the codebase.
| // Test: Response starts OK. call() writes an error into it. | ||
| // With correct ordering (log-before-call): the closure's logging sees OK (no warning), | ||
| // then call() runs and the callback captures the OK status at call time. | ||
| // With WRONG ordering (call-before-log): call() writes error first, then the closure | ||
| // would log the error — a false positive. We verify call() saw OK at invocation time, | ||
| // proving it ran after (or at least not before) the status was checked by the closure. | ||
| TEST_F(ExchangeSinkTest, test_closure_call_sees_original_ok_response) { | ||
| auto callback = TestCallback::create_shared(); | ||
| // Response starts OK (default). | ||
| callback->set_mutate_action(TestCallback::MutateAction::WRITE_ERROR); | ||
|
|
||
| auto req = std::make_shared<PTransmitDataParams>(); | ||
| auto* closure = new AutoReleaseClosure<PTransmitDataParams, TestCallback>(req, callback); | ||
|
|
||
| closure->Run(); // self-deletes | ||
|
|
||
| EXPECT_TRUE(callback->call_invoked) << "call() should have been invoked"; | ||
| EXPECT_TRUE(callback->response_status_at_call_time.ok()) | ||
| << "call() must see the original OK response status. " | ||
| "If it saw an error, the ordering is wrong."; | ||
| EXPECT_FALSE(callback->cntl_failed_at_call_time); |
There was a problem hiding this comment.
The new tests don’t actually validate the ordering guarantee they describe. StateCapturingCallback::call() captures cntl_/response_ state before it mutates them, so it will observe the “original” state regardless of whether AutoReleaseClosure::Run() calls call() before or after checking cntl_->Failed() / response_->status(). Consider asserting an observable side-effect of the closure’s pre-call check (e.g., capture/expect the warning log output, or add a hook that records what AutoReleaseClosure observed before invoking call()).
| if (cntl_->Failed()) { | ||
| if (auto w = _wrapper.lock()) { | ||
| w->set_state(RuntimeFilterWrapper::State::DISABLED, cntl_->ErrorText()); | ||
| } | ||
| if (auto ctx = _context.lock()) { | ||
| if (!ctx->ignore_runtime_filter_error()) { | ||
| ctx->cancel(Status::NetworkError("RPC meet failed: {}", cntl_->ErrorText())); | ||
| } | ||
| } | ||
| ((CountedFinishDependency*)_dependency.get())->sub(); | ||
| return; |
There was a problem hiding this comment.
SyncSizeCallback stores _dependency as std::shared_ptr<Dependency> but then downcasts with a C-style cast to CountedFinishDependency to call sub(). This is undefined behavior if a different Dependency subtype is ever passed, and it also bypasses type safety. Prefer storing/passing std::shared_ptr<CountedFinishDependency> (or at least dynamic_cast/dynamic_pointer_cast with a CHECK) and calling sub() directly.
The callback's call() method may reuse the callback object (e.g., in vdata_stream_sender.h get_send_callback()), triggering a new RPC that mutates response_ and cntl_. If AutoReleaseClosure::Run() invokes call() before checking cntl_->Failed() or response_->status(), it reads the NEW RPC's state instead of the ORIGINAL RPC's result, causing:
we have confirmed the data race is real existing with temporary LOGs which has been removed:
and we add some be-ut which could only pass WITH this patch.
before we fix:
after: